今天會講到的是,除了抓取資料的邏輯之外,要如何可以把這個過程自動化呢?
在經過一段時間的研究後,除了常見的土砲 cron job 的方式觸發 webhook 的方法之外,我發現了一個應用彈性而且使用上很快速就可以理解跟上手的 GCP Workflows
以下是來自官方的介紹:
GCP Workflows 是 Google Cloud 提供的一個工作流程自動化服務,可讓您使用 Google Cloud 服務和 API 來建構可靠的應用程式、自動化處理程序,以及資料和機器學習管道。
Workflows 使用簡單的 YAML 語言來定義工作流程,您可以使用它來串連 Google Cloud 中的各種服務,例如 Cloud Storage、Cloud SQL、Cloud Functions 等。Workflows 還提供許多高階功能,例如條件控制、重試、回退等,可讓您更輕鬆地設計複雜的工作流程。
在簡單看過文件之後,Workflows 提供了近乎全面的 GCP serverless 服務的串聯,只要是可以使用 HTTP request 或是呼叫 google api 的服務都可以被納入進來。
從圖中可以看到這次利用了 Cloud Scheduler
去定時觸發 Job 的發佈,之後組合 Workflow
跟 Cloud Task
去組合一個簡單的 queue ,可以簡單設定 QPS 跟 concurrent limit
Podcast feed list:
這邊為了測試方便,先將清單輸出成固定的 json
[
"https://feeds.simplecast.com/54nAGcIl",
"https://www.thisamericanlife.org/podcast/rss.xml",
"https://feeds.simplecast.com/PxEW_ipK",
"https://anchor.fm/s/599522d0/podcast/rss",
"https://anchor.fm/s/8c1524bc/podcast/rss",
"https://rss.art19.com/-exposed-",
"https://podcastfeeds.nbcnews.com/RPWEjhKq",
"https://podcastfeeds.nbcnews.com/dateline",
"https://feeds.megaphone.fm/strike-force-five",
"https://rss.art19.com/48-hours",
"https://feeds.transistor.fm/the-laravel-podcast",
"https://www.spreaker.com/show/4070003/episodes/feed"
]
Publisher workflow(podcast-tasks-publisher)
main:
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
- location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
- workflow_child_name: "podcast-tasks-handler"
- queue_name: "podcast-tasks-dispatcher"
- get_message:
call: http.get
args:
url: https://ithome.yhhbill.dev/podcast_feeds.json
result: podcast_feeds
- enqueue_tasks_to_execute_child_workflow:
for:
in: ${podcast_feeds.body}
index: i
value: podcast_feed_link
steps:
- iterate:
assign:
- data:
podcast_feed_link: ${podcast_feed_link}
- exec:
# Need to wrap into argument for Workflows args.
argument: ${json.encode_to_string(data)}
- create_task_to_execute_child_workflow:
call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
args:
parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
body:
task:
httpRequest:
body: ${base64.encode(json.encode(exec))}
url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
oauthToken:
serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}
- return_value:
return: ${podcast_feeds}
Flowchart
Handler workflow(podcast-tasks-handler)
main:
params: [args]
steps:
- init:
assign:
- podcast_feed_link : ${args.podcast_feed_link}
- step_A:
call: http.get
args:
url: https://podcast-crawler-galinwm43q-uw.a.run.app/update
body:
podcast_feed_link: ${podcast_feed_link}
auth:
type: OIDC
audience: https://podcast-crawler-galinwm43q-uw.a.run.app/update
result: the_response
- return_message:
return: ${the_response.body}
podcast-tasks-dispatcher
我們可以在 podcast-tasks-publisher
這個 workflow 手動執行。
下面就是執行完成的 log